RocketMq

您所在的位置:网站首页 rocketmq 创建监听 RocketMq

RocketMq

2023-09-10 14:44| 来源: 网络整理| 查看: 265

单产多费(1) 注意:不同的组之间,订阅的topic一样,每组收到的消息数一样多且无争抢,类似广播。

生产者:

//1.创建一个发送消息的对象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("localhost:9876"); //3.1启动发送的服务 producer.start(); for (int i = 0; i @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for (MessageExt msg : list) { System.out.println("收到消息:"+msg); System.out.println("消息是:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start(); System.out.println("接受消息服务已经开启!"); //5 不要关闭消费者! 单产多费(2)实现组内广播:只需改消费者配置:

消费者(广播模式,★★同一个组内,每个消费者都得到生产者发来的全部消息):

//1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); ★★★组名 //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("localhost:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意 consumer.subscribe("topic1","*"); //设置当前消费者的消费模式(默认模式:负载均衡) //consumer.setMessageModel(MessageModel.CLUSTERING); ★★★ 设置当前消费者的消费模式(广播模式) consumer.setMessageModel(MessageModel.BROADCASTING); //3.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for (MessageExt msg : list) { System.out.println("收到消息:"+msg); System.out.println("消息是:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start(); System.out.println("接受消息服务已经开启!"); //5 不要关闭消费者! 多生产者(见集群篇): 消息类型: 1.同步消息:

上面的代码都是同步消息: SendResult result = producer.send(msg); 特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

2.异步消息:

特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息。(一股脑把消息全发送出去,但是回执传回的速度不做要求)

生产者代码: 详细见注释

//1.创建一个发送消息的对象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("localhost:9876"); //3.1启动发送的服务 producer.start(); for (int i = 0; i //表示成功返回结果 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示发送消息失败 @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); System.out.println("消息"+i+"发完了,做业务逻辑去了!"); } ★★★★★休眠10秒 若不休眠,则send方法还没执行完,下面就关闭连接了,就报错了 TimeUnit.SECONDS.sleep(10); //5.关闭连接 producer.shutdown(); 3.单项消息(渣男):

特征:不需要回执,也不管对方是否收到的消息,例如日志类消息 代码: producer.sendOneway(msg);

4.延时消息

消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8")); //设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) msg.setDelayTimeLevel(3); SendResult result = producer.send(msg); System.out.println("返回结果:"+result);

目前支持的消息时间等级划分: private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

msg.setDelayTimeLevel(3):代表延时10s so,我们可以分别设置每条消息的不通延时时间

5.批量消息

批量发送消息能显著提高传递小消息的性能.

List msgList = new ArrayList(); Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8")); Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8")); Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8")); msgList.add(msg1); msgList.add(msg2); msgList.add(msg3); SendResult result = producer.send(msgList);

限制: 1这些批量消息应该有相同的topic 2相同的waitStoreMsgOK(消息类型也尽量一样) 3不能是延时消息 4消息内容总长度不超过4M

​ 消息内容总长度包含如下:

​ topic(字符串字节数)​ body (字节数组长度)​ 消息追加的属性(key与value对应字符串字节数)​ 日志(固定20字节) 消息过滤/sql过滤:

在消费者集群中设置,比如两台机器接受vip用户的消息,两台接收normal用户的消息。

生产者:

public class Producer { public static void main(String[] args) throws Exception { //1. 谁来发? DefaultMQProducer producer = new DefaultMQProducer("group1"); //2. 发给谁? producer.setNamesrvAddr("localhost:9876"); producer.start(); //3. 怎么发? //4. 发什么? String msg="hello world yuandongli"; Message message = new Message("topic8", "vip", msg.getBytes()); ★★★消息追加sql属性: message.putUserProperty("name","zhangsan"); message.putUserProperty("age", "15"); SendResult sendResult = producer.send(message); //5. 发的结果是什么? System.out.println(sendResult); //6. 打扫战场 producer.shutdown(); } }

消费者:

public class Consumer { public static void main(String[] args) throws Exception { //1. 谁来收? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2. 从哪里收消息? consumer.setNamesrvAddr("localhost:9876"); ★★★ //3. 监听哪个消息队列 按照tag过滤 //consumer.subscribe("topic8","tag1 || vip"); ★★★ //消费者 sql过滤 想要成年的消息 consumer.subscribe("topic8", MessageSelector.bySql("age > 16 and name='zhangsan'")); //4. 处理业务流程 注册监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { //写我们的业务逻辑 for (MessageExt msg : msgs) { System.out.println(msg); byte[] body = msg.getBody(); System.out.println(new String(body)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者起起来了"); //千万别关消费者 } } 注意:sql过滤要修改配置文件再启动!!!

SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

重启broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

或者直接cmd中输入

mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

页面查看开启与否



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3